/*
 * CUdpMediaCaster.cpp
 *
 *  Created on: 2016年4月23日
 *      Author: terry
 */

#include "CUdpMediaCaster.h"
#include "CLog.h"
#include "ClockTime.h"
#include <limits>
#include <algorithm>
#include <errno.h>
#include "RtpHeader.h"
#include "TFileUtil.h"
#include "TStringUtil.h"
#include "RtpPort.h"

#include "H264RtpPackager.h"
#include "H265RtpPackager.h"
#include "CRtpPackager.h"
#include "AppConst.h"
#include "RtpConst.h"


static const size_t THREAD_STACK_SIZE = 2 * 1024 * 1024;


namespace av
{

CUdpMediaCaster::CUdpMediaCaster():
		m_packetCount(),
		m_lastTime(),
		m_packager(new CRtpPackager(MEDIA_TYPE_DATA, MEDIA_CODEC_NONE)),
		m_seq(),
		m_ssrc(),
		m_payload(),
		m_initTime(0),
		m_lastPktTime(),
		m_delayCount(),
		m_id(),
		m_audioPackager(new CRtpPackager(MEDIA_TYPE_AUDIO, MEDIA_CODEC_NONE)),
		m_audioPayload(),
		m_audioSeq()
{
	setStackSize(THREAD_STACK_SIZE);

	m_ssrc = 1;
	m_seq = 1;
}

CUdpMediaCaster::~CUdpMediaCaster()
{
	close();
}


int CUdpMediaCaster::open(const std::string& ip, uint16_t port, int codec, int clockRate, int payload, int ssrc)
{
	CLog::info("CUdpMediaCaster::open. ip: %s, port: %d, fmt.codec:%d\n", ip.c_str(), port, codec);

    if (port == 0)
    {
        port = RtpPort::make();
    }

    for (size_t i = 0; i < (RtpPort::count() + 1); ++ i)
    {
        if (openRtpSession(port))
        {
            break;
        }
        else
        {
            port = RtpPort::make();
        }
    }

    if (!m_socket.isOpen())
    {
        return ENOENT;
    }

    m_addr.set(ip, port);
    m_payload = payload;
    m_ssrc = ssrc;

    m_packager.reset(createPackager(codec));
	m_packager->setPayload(m_payload);

    start();

    return 0;
}

bool CUdpMediaCaster::isOpen()
{
    return m_socket.isOpen();
}

void CUdpMediaCaster::close()
{
    stop();

    closeRtpSession();

    clearCache();
}


void CUdpMediaCaster::setID(int id)
{
	m_id = id;
}

int CUdpMediaCaster::getID()
{
	return m_id;
}


NetAddress CUdpMediaCaster::getLocalAddr()
{
    return m_addr;
}


bool CUdpMediaCaster::addTarget(const std::string& ip, uint16_t port)
{
	NetAddress addr(ip, port);

    bool done = false;
    comn::AutoCritSec lock(m_cs);
    RtpTargetArray::iterator it = std::find(m_targets.begin(), m_targets.end(), addr);
    if (it == m_targets.end())
    {
        m_targets.push_back(addr);

        done = true;
    }

	CLog::info("CUdpMediaCaster::addTarget. ip: %s, port: %d. done: %d\n", ip.c_str(), port, done);

    return done;
}

void CUdpMediaCaster::removeTarget(const std::string& ip, uint16_t port)
{
	NetAddress addr(ip, port);

    comn::AutoCritSec lock(m_cs);
    RtpTargetArray::iterator it = std::remove(m_targets.begin(), m_targets.end(), addr);
    if (it != m_targets.end())
    {
        m_targets.erase(it);
    }
}

size_t CUdpMediaCaster::getTargetCount()
{
    comn::AutoCritSec lock(m_cs);
    return m_targets.size();
}

void CUdpMediaCaster::removeAllTarget()
{
	comn::AutoCritSec lock(m_cs);
	m_targets.clear();
}

NetAddress CUdpMediaCaster::getTargetAt(size_t idx)
{
    NetAddress addr;
    comn::AutoCritSec lock(m_cs);
    if (idx < m_targets.size())
    {
        addr = m_targets[idx];
    }
    return addr;
}

void CUdpMediaCaster::removeTargets()
{
    comn::AutoCritSec lock(m_cs);
    m_targets.clear();

}

void CUdpMediaCaster::onMediaFormat(const MediaFormat& fmt)
{
	m_audioPackager.reset(createPackager(fmt.m_audioCodec));

	if (fmt.m_audioCodec == MEDIA_CODEC_G711A)
	{
		m_audioPayload = 8;
	}
	else if (fmt.m_audioCodec == MEDIA_CODEC_G711U)
	{
		m_audioPayload = 0;
	}
	else
	{
		m_audioPayload = m_payload + 1;
	}

	m_audioPackager->setPayload(m_audioPayload);
}

void CUdpMediaCaster::onMediaPacket(MediaPacketPtr& pkt)
{
    if (pkt->empty())
    {
        return;
    }

    //CLog::debug("pkt. size:%d, ts:%I64d\n", pkt->size, pkt->ts);

    m_pktQueue.push(pkt);
}

void CUdpMediaCaster::onMediaEvent(int event)
{
    /// pass
}

int CUdpMediaCaster::run()
{
    while (!m_canExit)
    {
        MediaPacketPtr pkt;
        if (!m_pktQueue.pop(pkt, 1000))
        {
            continue;
        }

        size_t count = m_pktQueue.size();
        double delayRate = m_casterDelay.computeDelay(count) * 0.8;
        if (count > (CasterDelay::kQueueMaxSize*2))
        {
            //CLog::warning("CUdpMediaCaster queue is flood. %d\n", count);
        }

        //assert(m_targets.size() > 0);
        sendPacket(pkt, delayRate);

    }
    return 0;
}

void CUdpMediaCaster::doStop()
{
    m_pktQueue.cancelWait();

    m_eventDelay.post();
}

void CUdpMediaCaster::onSlicePacket(const RtpPacket& pkt)
{
    sendPacket(pkt);
}


bool CUdpMediaCaster::openRtpSession(uint16_t port)
{
    if (!m_socket.isOpen())
    {
        m_socket.open(SOCK_DGRAM);
    }

	if (!m_socket.setSendBufferSize(AppConst::SEND_BUFFER_SIZE))
	{
		m_socket.setSendBufferSize(AppConst::SEND_BUFFER_SIZE / 2);
	}

    comn::SockAddr localAddr(m_addr.m_ip, port);
    int ret = m_socket.bind(localAddr);
    if (ret != 0)
    {
        m_socket.close();
        return false;
    }

    return true;
}

void CUdpMediaCaster::closeRtpSession()
{
    if (m_socket.isOpen())
    {
        m_socket.close();
    }
}

void CUdpMediaCaster::clearCache()
{
    m_pktQueue.clear();
}

void CUdpMediaCaster::sendPacket(MediaPacketPtr& packet, double delayRate)
{
    if (!packet || packet->empty())
    {
        return;
    }

    MediaPacket& pkt = *packet;

    if (m_lastPktTime <= 0)
    {
        m_lastPktTime = pkt.pts;
    }

    int64_t tmStart = comn::ClockTime::getTime();

	if (packet->type == MEDIA_TYPE_AUDIO)
	{
		m_audioPackager->slice(pkt, av::RtpConst::RTP_PACKET_SIZE, this);
	}
	else
	{
		m_packager->slice(pkt, av::RtpConst::RTP_PACKET_SIZE, this);
	}

    int32_t duration = (int32_t)(pkt.pts - m_lastPktTime);
    int32_t elapse = (int32_t)(comn::ClockTime::getTime() - tmStart);
    int32_t delay = (int32_t)(duration * delayRate / 50);
    if (delay < elapse)
    {
        if (delay > 0)
        {
            CLog::debug("CUdpMediaCaster sendPacket(%d). delay: %d, elapse:%d\n",
                pkt.size, delay, elapse);
        }
    }

    delay -= elapse;
    delay = m_casterDelay.limitDelay(delay);
    if (delay > 0)
    {
        //m_eventDelay.timedwait(delay);
        timedwait(delay);
    }

    m_delayCount = 0;
}

void CUdpMediaCaster::sendPacket(const RtpPacket& pkt)
{
    //CLog::info("CUdpMediaCaster::sendPacket. length:%d, pkt.ts:%u. \n", pkt.size, pkt.ts);

    if (m_initTime == 0)
    {
        m_initTime = 3600 + (std::numeric_limits<uint32_t>::max() - pkt.ts);
    }

    comn::AutoCritSec lock(m_cs);

    m_buffer.clear();
    m_buffer.ensure(pkt.size + sizeof(RTP_FIXED_HEADER));

    RTP_FIXED_HEADER header;
    memset(&header, 0, sizeof(header));
    header.version = 2;
    header.marker = pkt.mark;

    header.timestamp = htonl(m_initTime + pkt.ts);

    header.ssrc = htonl(m_ssrc);

	if (pkt.pt == m_payload)
	{
		header.seq_no = htons(m_seq ++);
	}
	else
	{
		header.seq_no = htons(m_audioSeq ++);
	}

    header.payload = pkt.pt;

    m_buffer.write((char*)&header, sizeof(header));

    m_buffer.write(pkt.data, pkt.size);

	if (m_targets.empty())
	{
		CLog::warning("CUdpMediaCaster::sendPacket. no target\n");
	}

    for (size_t i = 0; i < m_targets.size(); ++ i)
    {
        NetAddress& addr = m_targets[i];
        comn::SockAddr sockAddr(addr.m_ip, addr.m_port);
        int ret = m_socket.sendTo((char*)m_buffer.data(), m_buffer.size(), 0, sockAddr);
        if (ret != (int)m_buffer.size())
        {
            m_socket.checkWritable(100);
            CLog::warning("sendTo %s:%d failed. seq:%d\n",
                addr.m_ip.c_str(), addr.m_port, header.seq_no);
        }
    }
}

void CUdpMediaCaster::timedwait(int ms)
{
    //uint64_t tmStart = comn::ClockTime::getTime();

    fd_set setEx;
    FD_ZERO(&setEx);
    FD_SET(m_socket.getHandle(), &setEx);

    timeval tv = { ms / 1000, (ms % 1000) * 1000 };

#ifdef WIN32
    select((int) m_socket.getHandle() + 1, NULL, &setEx, &setEx, &tv);
#else
    select((int) m_socket.getHandle() + 1, NULL, NULL, &setEx, &tv);
#endif //

    //int elapse = (int)(comn::ClockTime::getTime() - tmStart);
    //if (elapse > (ms + 2))
    //{
    //    CLog::debug("CUdpMediaCaster::timedwait. elapse: %d, raw:%d\n", elapse, ms);
    //}
}

RtpPackager* CUdpMediaCaster::createPackager(int codec)
{
    if (codec == MEDIA_CODEC_H264)
    {
        return new H264RtpPackager();
    }
	if (codec == MEDIA_CODEC_HEVC)
	{
		return new H265RtpPackager();
	}
	else if (codec == MEDIA_CODEC_PS)
	{
		return new CRtpPackager(MEDIA_TYPE_DATA, MEDIA_CODEC_PS);
	}
	else if ((codec == MEDIA_CODEC_G711A) || (codec == MEDIA_CODEC_G711U))
	{
		return new CRtpPackager(MEDIA_TYPE_AUDIO, (MediaCodec)codec);
	}
    return new CRtpPackager(MEDIA_TYPE_DATA, (MediaCodec)codec);
}


} /* namespace av */
